package com.dmp.util

import java.util
import java.util.Properties

import com.dmp.config.ConfigHandler
import com.oracle.beans.{Rpt, RptArea, RptCode}
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}

object RptKpi {
  def makeCommonDataFrame(dataFrame: DataFrame, sQLContext: SQLContext, Va: String, dbName: String): Unit = {
    import sQLContext.implicits._
    dataFrame.map(row => {

      // 提取个指标相关的字段
      val vaValue = row.getAs[String](Va)

      // 返回一个对偶元组
      ((vaValue),this.apply(row))
    }).reduceByKey((a, b) => a.zip(b).map(t => t._1 + t._2))
      //            .reduceByKey((a, b) => (
      //            a._1 + b._1,
      //            a._2 + b._2,
      //            a._3 + b._3,
      //            a._4 + b._4,
      //            a._5 + b._5,
      //            a._6 + b._6,
      //            a._7 + b._7,
      //            a._8 + b._8,
      //            a._9 + b._9
      //        ))
      .map(tp => Rpt(
      tp._1,
      tp._2(0).toInt,
      tp._2(1).toInt,
      tp._2(2).toInt,
      tp._2(3).toInt,
      tp._2(4).toInt,
      tp._2(5).toInt,
      tp._2(6).toInt,
      tp._2(7),
      tp._2(8)
    )).toDF().write.mode(SaveMode.Overwrite).jdbc(ConfigHandler.url, dbName, ConfigHandler.prop)
  }
  def apply(row: Row) ={



    // 提取个指标相关的字段
    val reqMode = row.getAs[Int]("requestmode")
    val proNode = row.getAs[Int]("processnode")
    val effTive = row.getAs[Int]("iseffective")
    val billing = row.getAs[Int]("isbilling")
    val isbid = row.getAs[Int]("isbid")
    val iswin = row.getAs[Int]("iswin")
    val adOrderId = row.getAs[Int]("adorderid")

    val rawReq = if (reqMode == 1 && proNode >= 1) 1 else 0
    val effReq = if (reqMode == 1 && proNode >= 2) 1 else 0
    val adReq = if (reqMode == 1 && proNode == 3) 1 else 0

    val adRtbReq = if (effTive == 1 && billing == 1 && isbid == 1 && adOrderId != 0) 1 else 0
    val adSuccRtbAndCostAndConsumption = if (effTive == 1 && billing == 1 && iswin == 1) {
      val winPrice = row.getAs[Double]("winprice")
      val adPayment = row.getAs[Double]("adpayment")

      (1, adPayment / 1000, winPrice / 1000)

    } else (0, 0d, 0d)

    val adShow = if (reqMode == 2 && effTive == 1) 1 else 0
    val adClick = if (reqMode == 3 && effTive == 1) 1 else 0

    List(rawReq, effReq, adReq, adRtbReq, adSuccRtbAndCostAndConsumption._1, adShow, adClick, adSuccRtbAndCostAndConsumption._2, adSuccRtbAndCostAndConsumption._3)
  }
  /*
  * 需要区分方式的
  * 例:
  * 1代表2代表
  *
  * */
  def areaRportParameterInt(dataFrame: DataFrame, sQLContext: SQLContext, dbName: String, clName: String, name: util.Map[String, AnyRef]): Unit = {
    import sQLContext.implicits._
    dataFrame.map(row => {
      // 提取个指标相关的字段
      val code: Int = row.getAs[Int](clName)

      // 返回一个对偶元组
      ((code),RptKpi.apply(row) )
    }).reduceByKey((a, b) => a.zip(b).map(t => t._1 + t._2))
      //            .reduceByKey((a, b) => (
      //            a._1 + b._1,
      //            a._2 + b._2,
      //            a._3 + b._3,
      //            a._4 + b._4,
      //            a._5 + b._5,
      //            a._6 + b._6,
      //            a._7 + b._7,
      //            a._8 + b._8,
      //            a._9 + b._9
      //        ))
      .map(tp => RptCode(
      name.get(tp._1 + "").toString,
      tp._2(0).toInt,
      tp._2(1).toInt,
      tp._2(2).toInt,
      tp._2(3).toInt,
      tp._2(4).toInt,
      tp._2(5).toInt,
      tp._2(6).toInt,
      tp._2(7),
      tp._2(8)
    )).toDF().write.mode(SaveMode.Overwrite).jdbc(ConfigHandler.url, dbName, ConfigHandler.prop)
  }

  /*
  * 需要根据ID获取名称的
  * */
    def AppAnalysisCore(sc:SparkContext,dataFrame:DataFrame,appname:String,appid:String,savePath:String ): Unit ={
      // 读取字段文件中的数据
      val appdict = sc.textFile(ConfigHandler.appdictFilePath)
        .map(_.split("\t", -1))
        .filter(_.length >= 5)
        .map(fields => (fields(4), fields(1)))

      val appdictArray = appdict.collectAsMap()
      // 他不能广播动态的数据
      val apppdictBT = sc.broadcast(appdictArray)



      // 报表的维度
      dataFrame.map(row => {

        // 维度字段
        var appName = row.getAs[String](appname)
        val appId = row.getAs[String](appid)

        if (StringUtils.isEmpty(appName)) { // appname为空了
          if (StringUtils.isNotEmpty(appId)) { // 去字典文件中找
            appName = apppdictBT.value.getOrElse(appId, appId) //MySQL||Redis
          } else {
            appName = "未知"
          }
        }
        (appName, RptKpi(row))
      })
        .reduceByKey((list1, list2) => list1.zip(list2).map(tp => tp._1 + tp._2))
        .sortBy(tp => tp._2.head, false)
        .saveAsTextFile(savePath)

      sc.stop()
    }
}
